Skip to content

Implement load tests for Kafka#411

Merged
kibertoad merged 4 commits intomainfrom
feat/kafka-load-tests
Feb 27, 2026
Merged

Implement load tests for Kafka#411
kibertoad merged 4 commits intomainfrom
feat/kafka-load-tests

Conversation

@kibertoad
Copy link
Owner

@kibertoad kibertoad commented Feb 26, 2026

Summary by CodeRabbit

  • New Features

    • Added a comprehensive Kafka load-testing suite with CDC and Direct modes, single-message and batch consumption, CLI-driven scenarios, and Docker compose for Kafka/CockroachDB/UIs.
    • Real-time metrics and latency reporting (avg, p50, p95, p99) with per-topic breakdown and drain/timeout handling.
  • Documentation

    • Included a README with quick-start, modes, CLI options, presets, architecture, and measurement details.
  • Chores

    • Enforced LF line endings for shell scripts.

@coderabbitai
Copy link

coderabbitai bot commented Feb 26, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c05da5e and 83dbb2b.

📒 Files selected for processing (16)
  • biome.json
  • packages/kafka/load-tests/package.json
  • packages/kafka/load-tests/scripts/init-crdb.sh
  • packages/kafka/load-tests/src/batch-load-generator.ts
  • packages/kafka/load-tests/src/cdc-batch-consumer.ts
  • packages/kafka/load-tests/src/cdc-consumer.ts
  • packages/kafka/load-tests/src/cdc-schemas.ts
  • packages/kafka/load-tests/src/crdb-client.ts
  • packages/kafka/load-tests/src/direct-batch-consumer.ts
  • packages/kafka/load-tests/src/direct-batch-load-generator.ts
  • packages/kafka/load-tests/src/direct-consumer.ts
  • packages/kafka/load-tests/src/direct-load-generator.ts
  • packages/kafka/load-tests/src/direct-publisher.ts
  • packages/kafka/load-tests/src/direct-schemas.ts
  • packages/kafka/load-tests/src/load-generator.ts
  • packages/kafka/load-tests/tsconfig.json

📝 Walkthrough

Walkthrough

Adds a Kafka load-testing package with Dockerized infra, CockroachDB initialization, schema definitions, publishers/consumers (single and batch modes), load generators, metrics collection, and CLI runners for CDC and direct testing.

Changes

Cohort / File(s) Summary
Package & Tooling
packages/kafka/load-tests/package.json, packages/kafka/load-tests/tsconfig.json, biome.json, packages/kafka/load-tests/.gitattributes
New package config, TypeScript cfg, lint include update, and .gitattributes enforcing LF for scripts.
Documentation
packages/kafka/load-tests/README.md
Adds comprehensive README describing modes, CLI, quick start, architecture, and latency measurement.
Docker & Init Scripts
packages/kafka/load-tests/docker-compose.yml, packages/kafka/load-tests/scripts/init-crdb.sh
Adds Docker Compose for kafka/crdb/kafka-ui and an init script that creates DB/tables and conditionally creates a Kafka changefeed.
Configuration
packages/kafka/load-tests/src/config.ts
Centralizes bootstrap brokers, clientId, CRDB connection, reporting and drain timeouts.
Schemas
packages/kafka/load-tests/src/cdc-schemas.ts, packages/kafka/load-tests/src/direct-schemas.ts
Adds Zod schemas and topic config arrays for CDC and direct event/order topics.
DB Client & Metrics
packages/kafka/load-tests/src/crdb-client.ts, packages/kafka/load-tests/src/metrics-collector.ts
Adds CrdbClient for bulk inserts and MetricsCollector for per-topic counts and latency stats.
Consumers (CDC & Direct)
packages/kafka/load-tests/src/cdc-consumer.ts, .../cdc-batch-consumer.ts, .../direct-consumer.ts, .../direct-batch-consumer.ts
Adds single-message and batch-capable Kafka consumer implementations wired to schemas and metrics.
Publishers & Generators
packages/kafka/load-tests/src/direct-publisher.ts, .../load-generator.ts, .../batch-load-generator.ts, .../direct-load-generator.ts, .../direct-batch-load-generator.ts
Adds publishers and load generators for CDC (via CRDB changefeed) and direct publish modes with rate control and draining logic.
CLI Runners
packages/kafka/load-tests/src/run.ts, .../run-batch.ts, .../run-direct.ts, .../run-direct-batch.ts
Adds CLI entrypoints parsing flags and invoking respective load-test functions.

Sequence Diagram(s)

sequenceDiagram
    participant CLI as CLI/run.ts
    participant LG as LoadGenerator
    participant MC as MetricsCollector
    participant CC as CrdbClient
    participant CDC as CdcConsumer
    participant Kafka as Kafka

    CLI->>LG: runLoadTest(options)
    LG->>MC: init
    LG->>CC: init
    LG->>CDC: init & start
    LG->>MC: startPeriodicReporting()

    loop generate batches
        LG->>CC: insertEvents(...)
        LG->>CC: insertOrders(...)
        CC->>Kafka: DB changefeed emits messages
        Kafka->>CDC: deliver messages
        CDC->>MC: recordConsumed(topic, loadtest_ts)
        MC->>MC: compute latency
        LG->>LG: rate throttling
    end

    LG->>CDC: drain backlog (with timeout)
    CDC->>MC: record remaining
    LG->>MC: printFinalReport()
    LG->>CDC: close
    LG->>CC: close
Loading
sequenceDiagram
    participant CLI as CLI/run-direct.ts
    participant LG as DirectLoadGenerator
    participant MC as MetricsCollector
    participant DP as DirectPublisher
    participant Kafka as Kafka
    participant DC as DirectConsumer

    CLI->>LG: runDirectLoadTest(options)
    LG->>MC: init
    LG->>DP: init
    LG->>DC: init & start
    LG->>MC: startPeriodicReporting()

    loop generate batches
        LG->>DP: publish direct-events/direct-orders
        DP->>Kafka: send messages (with loadtest_ts)
        MC->>MC: recordProduced(count)
        Kafka->>DC: deliver messages
        DC->>MC: recordConsumed(topic, loadtest_ts)
        MC->>MC: compute latency
        LG->>LG: rate throttling
    end

    LG->>DC: drain backlog (with timeout)
    DC->>MC: record remaining
    LG->>MC: printFinalReport()
    LG->>DC: close
    LG->>DP: close
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • PR #371: Batch stream handling changes to AbstractKafkaConsumer relied upon by the new batch consumer implementations.
  • PR #374: Schema/topic and publisher wrapping updates that the new schemas and publisher classes depend on.

Suggested labels

minor

Suggested reviewers

  • kjamrog
  • CarlosGamero

Poem

🐰 I hopped into code with a thump and a cheer,

sending messages far and wide to appear.
Kafka and CRDB twirl in a test,
metrics hop in — latency dressed.
Hooray for batches and throughput, my dear! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Implement load tests for Kafka' accurately and concisely summarizes the primary change: adding a comprehensive load testing suite for Kafka with CDC and direct publishing modes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/kafka-load-tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (11)
packages/kafka/load-tests/README.md (2)

85-87: Add language specifier to fenced code blocks.

The ASCII diagrams should have a language specifier (e.g., text or plaintext) to satisfy markdown linting rules.

📝 Suggested fix
-```
+```text
 Load Generator → CockroachDB (inserts) → CDC Changefeed → Kafka → Consumer → Metrics
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/README.md` around lines 85 - 87, The fenced code
block containing the ASCII diagram lacks a language specifier; update the
triple-backtick fence around the diagram in the README (the block that currently
contains "Load Generator → CockroachDB (inserts) → CDC Changefeed → Kafka →
Consumer → Metrics") to include a language tag such as text or plaintext (e.g.,
```text) so markdown linting passes and the diagram is treated as plain text.

96-98: Add language specifier to fenced code block.

Same issue as the CDC diagram above.

📝 Suggested fix
-```
+```text
 Load Generator → AbstractKafkaPublisher → Kafka → Consumer → Metrics
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/README.md` around lines 96 - 98, The fenced code
block containing the arrow diagram "Load Generator → AbstractKafkaPublisher →
Kafka → Consumer → Metrics" is missing a language specifier; update the opening
fence from ``` to ```text so it becomes a text code block and preserves
formatting/escaping in Markdown and matches the CDC diagram fix pattern.
packages/kafka/load-tests/src/cdc-batch-consumer.ts (1)

73-73: Consider disabling logMessages for load tests.

logMessages: true in a high-throughput batch consumer will generate substantial log output during load testing. The non-batch CdcConsumer has this set to false. Unless verbose logging is specifically needed for debugging, consider setting this to false for consistency and reduced noise.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/src/cdc-batch-consumer.ts` at line 73, The
load-test batch consumer currently has logMessages: true which will produce
excessive output under high throughput; update the consumer config used by the
Cdc batch consumer (the object containing logMessages) to set logMessages: false
to match the non-batch CdcConsumer and reduce noise during load tests, ensuring
any places that instantiate or export that config (e.g., the CdcBatchConsumer
configuration object) use the new value.
packages/kafka/load-tests/docker-compose.yml (2)

29-30: Consider pinning CockroachDB image version for reproducibility.

Using latest-v25.1 is a floating tag that may change over time. For consistent load test results, consider pinning to a specific version (e.g., cockroachdb/cockroach:v25.1.0).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/docker-compose.yml` around lines 29 - 30, The
docker-compose service "cockroachdb" currently uses the floating tag
"cockroachdb/cockroach:latest-v25.1"; change this to a pinned immutable tag (for
example "cockroachdb/cockroach:v25.1.0" or another exact patch release) in the
"image" field so load tests are reproducible and don't shift when the upstream
tag changes.

56-57: Consider pinning kafka-ui image version.

provectuslabs/kafka-ui:latest may introduce breaking changes. Consider pinning to a specific version for stability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/docker-compose.yml` around lines 56 - 57, The
docker-compose service "kafka-ui" uses the floating image tag
provectuslabs/kafka-ui:latest which can introduce breaking changes; update the
kafka-ui service definition to pin the image to a specific, tested version
(e.g., provectuslabs/kafka-ui:<stable-version>) and document the chosen version;
locate the kafka-ui service entry and replace the :latest tag with the selected
semantic version or digest and update any related deployment notes or README to
record the pinned version for future upgrades.
packages/kafka/load-tests/src/batch-load-generator.ts (2)

7-13: Duplicate interface definition with direct-batch-load-generator.ts.

BatchLoadTestOptions is identically defined in both this file and direct-batch-load-generator.ts (lines 9-15). Consider extracting to a shared types file to avoid drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/src/batch-load-generator.ts` around lines 7 - 13,
The BatchLoadTestOptions interface is duplicated; extract it into a shared types
module and import it from both places to avoid drift: create a new exported
interface named BatchLoadTestOptions in a common types file, remove the
duplicate declarations in batch-load-generator.ts and
direct-batch-load-generator.ts, and update both files to import {
BatchLoadTestOptions } from the new module so the same definition is referenced
everywhere.

49-58: Consider: totalInserted includes failed inserts, causing misleading final output.

totalInserted is incremented before the async insert completes (line 49), but metrics.recordProduced is only called on success (line 56). If inserts fail, the final log at line 74 will report more rows "inserted" than actually succeeded.

For a load test tool this may be acceptable, but for accurate reporting consider incrementing totalInserted inside the .then() callback or tracking failures separately.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/src/batch-load-generator.ts` around lines 49 - 58,
totalInserted is incremented before the async inserts complete, so failed
inserts are counted as successes; move the mutation of totalInserted into the
success path and/or maintain a separate failure counter: remove or stop updating
totalInserted before Promise.all, then inside the Promise.all .then() callback
(after crdb.insertEvents/crdb.insertOrders succeed and metrics.recordProduced is
called) increment totalInserted by currentBatch; additionally, in the .catch()
increment a new failedInserted (or failedBatches) metric so final reports
reflect actual successes vs failures; keep inflight-- in .finally() as-is.
packages/kafka/load-tests/src/direct-batch-load-generator.ts (2)

17-34: Duplicate helper functions: generateEvent and generateOrder.

These functions are identical to those in direct-load-generator.ts (lines 15-32). Consider extracting them to a shared module (e.g., direct-schemas.ts or a new test-data-generators.ts) to eliminate duplication.

♻️ Suggested approach

Create a shared module:

// test-data-generators.ts
export function generateEvent(index: number): DirectEvent { ... }
export function generateOrder(index: number): DirectOrder { ... }

Then import in both load generators.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/src/direct-batch-load-generator.ts` around lines 17
- 34, The helpers generateEvent and generateOrder are duplicated here and in
direct-load-generator.ts; extract both functions into a shared module (e.g.,
test-data-generators.ts or direct-schemas.ts), export them (generateEvent,
generateOrder), then import and use those exports in
direct-batch-load-generator.ts (and direct-load-generator.ts) to remove
duplication; ensure the exported functions keep the same signatures and return
types (DirectEvent, DirectOrder) so callers need no changes.

1-103: Observation: Structural similarity with direct-load-generator.ts.

The overall structure closely mirrors direct-load-generator.ts. If more load test variants are planned, consider a factory or shared orchestration function parameterized by consumer type to reduce maintenance burden. For now, the duplication is manageable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/src/direct-batch-load-generator.ts` around lines 1
- 103, The file duplicates orchestration logic already in
direct-load-generator.ts; refactor by extracting the shared load-test
orchestration into a single function (e.g., runLoadTestCore) that accepts
injected components/factories and options, then modify runDirectBatchLoadTest to
call that core with DirectBatchConsumer, DirectPublisher, MetricsCollector and
the BatchLoadTestOptions; specifically, move common setup (MetricsCollector
creation, publisher/consumer init, publish loop, rate throttling, drain/wait
logic, reporting and shutdown) into the core, keep only the variant-specific
consumer/publisher construction and payload generators (generateEvent,
generateOrder) in this file, and replace direct calls to
consumer.init()/publisher.init(), consumer.close()/publisher.close(), and the
publish loop in runDirectBatchLoadTest with a call to the new core function so
both runDirectBatchLoadTest and the direct-load-generator counterpart can share
it.
packages/kafka/load-tests/src/load-generator.ts (1)

32-37: Informational: delayMs is computed but not used for throttling.

batchesPerSecond and delayMs are calculated and logged here, but the actual rate limiting (lines 67-74) uses a cumulative elapsed-time approach instead. This isn't wrong—the logged values provide useful context—but the two throttling models could confuse future maintainers.

Consider either:

  • Adding a comment clarifying that delayMs is informational only
  • Or removing the unused calculation
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/src/load-generator.ts` around lines 32 - 37, The
variables batchesPerSecond and delayMs in load-generator.ts are calculated only
for logging and not used for throttling (throttling is handled by the cumulative
elapsed-time logic later); update the code to avoid confusion by either removing
the unused delayMs calculation or adding a clarifying comment near the log
(around the console.log that prints batchesPerSecond and delayMs) that states
delayMs is informational only and the actual throttling is implemented by the
elapsed-time based loop (referencing batchesPerSecond and delayMs so reviewers
can find the lines).
packages/kafka/load-tests/src/metrics-collector.ts (1)

7-7: Memory consideration: unbounded latencies array.

The latencies array grows with each consumed message that has a timestamp. For extended load tests with millions of messages, this could consume significant memory. Consider either:

  • Sampling latencies (e.g., record every Nth)
  • Using a streaming percentile algorithm (e.g., t-digest)
  • Periodically trimming old samples

For typical short-duration load tests, the current approach is fine.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/src/metrics-collector.ts` at line 7, The latencies
array (latencies: number[]) is unbounded and can grow to O(messages), so replace
it with a bounded/summarizing structure: either implement sampling (add a
sampleRate config and only push to latencies when counter % sampleRate === 0),
or swap the array for a streaming percentile structure (e.g., a TDigest or
reservoir sampler instance) and call its add/merge methods where the code
currently pushes into latencies; also add an optional maxSamples cap and
eviction/rotation logic if you prefer trimming. Locate all places referencing
latencies (the latencies field and any methods that push to it) and change them
to use the new sampler/tDigest API or the sampleRate check; ensure reporting
functions that read latencies are updated to compute percentiles from the new
structure or from the sampled array.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/kafka/load-tests/package.json`:
- Around line 10-25: The package.json scripts run TypeScript files directly
(e.g., scripts "load:cdc", "load:cdc:light", "load:direct", "load:direct:batch"
that call node src/run.ts, src/run-batch.ts, src/run-direct.ts,
src/run-direct-batch.ts) which requires either Node 22.18.0+ or the
--experimental-strip-types flag for older 22.x; update each script that invokes
node on a .ts file to prefix the node command with --experimental-strip-types
(e.g., node --experimental-strip-types src/run.ts) or alternatively add a clear
README/engine field documenting Node >=22.18.0 as a requirement so CI and devs
know not to use older runtimes.

In `@packages/kafka/load-tests/scripts/init-crdb.sh`:
- Around line 36-42: Make the changefeed creation idempotent: before running the
CREATE CHANGEFEED for events, orders check for an existing changefeed and skip
or cancel it first; for example, query the cluster's changefeeds (e.g., SHOW
CHANGEFEEDS or inspect system jobs for job_type=CHANGEFEED) to find any
changefeed targeting events/orders and either CANCEL the job or skip creation,
then run the CREATE CHANGEFEED only if none exists; alternatively wrap the
CREATE step in error-handling in the script to catch the "already exists" error
and ignore it so repeated runs do not fail.

In `@packages/kafka/load-tests/src/cdc-batch-consumer.ts`:
- Line 10: Change the import so CDC_TOPICS_CONFIG is imported as a type-only
symbol; update the import line in cdc-batch-consumer.ts to keep CDC_EVENT_SCHEMA
and CDC_ORDER_SCHEMA as value imports but import CDC_TOPICS_CONFIG (and the
CdcEvent/CdcOrder types) with the type qualifier (e.g. import {
CDC_EVENT_SCHEMA, CDC_ORDER_SCHEMA, type CDC_TOPICS_CONFIG, type CdcEvent, type
CdcOrder } from './cdc-schemas.ts') so CDC_TOPICS_CONFIG is not treated as a
runtime value.

In `@packages/kafka/load-tests/src/cdc-consumer.ts`:
- Line 10: The import is mixing value and type imports; make CDC_TOPICS_CONFIG a
type-only import to match cdc-batch-consumer.ts: keep CDC_EVENT_SCHEMA and
CDC_ORDER_SCHEMA as value imports but import CDC_TOPICS_CONFIG (and the
CdcEvent/CdcOrder types) using an import type so CDC_TOPICS_CONFIG is not
emitted at runtime; update the import statement that currently references
CDC_EVENT_SCHEMA, CDC_ORDER_SCHEMA, CDC_TOPICS_CONFIG, CdcEvent, CdcOrder
accordingly.

In `@packages/kafka/load-tests/src/direct-schemas.ts`:
- Line 1: The file incorrectly uses a default import from Zod ("import z from
'zod/v4'") which doesn't exist; replace it with a valid import such as a named
import "import { z } from 'zod/v4'" or a namespace import "import * as z from
'zod'" so that subsequent uses of the z symbol (e.g., any calls to z.object,
z.string, etc.) resolve correctly; update the import line only and ensure no
other code changes are necessary.

---

Nitpick comments:
In `@packages/kafka/load-tests/docker-compose.yml`:
- Around line 29-30: The docker-compose service "cockroachdb" currently uses the
floating tag "cockroachdb/cockroach:latest-v25.1"; change this to a pinned
immutable tag (for example "cockroachdb/cockroach:v25.1.0" or another exact
patch release) in the "image" field so load tests are reproducible and don't
shift when the upstream tag changes.
- Around line 56-57: The docker-compose service "kafka-ui" uses the floating
image tag provectuslabs/kafka-ui:latest which can introduce breaking changes;
update the kafka-ui service definition to pin the image to a specific, tested
version (e.g., provectuslabs/kafka-ui:<stable-version>) and document the chosen
version; locate the kafka-ui service entry and replace the :latest tag with the
selected semantic version or digest and update any related deployment notes or
README to record the pinned version for future upgrades.

In `@packages/kafka/load-tests/README.md`:
- Around line 85-87: The fenced code block containing the ASCII diagram lacks a
language specifier; update the triple-backtick fence around the diagram in the
README (the block that currently contains "Load Generator → CockroachDB
(inserts) → CDC Changefeed → Kafka → Consumer → Metrics") to include a language
tag such as text or plaintext (e.g., ```text) so markdown linting passes and the
diagram is treated as plain text.
- Around line 96-98: The fenced code block containing the arrow diagram "Load
Generator → AbstractKafkaPublisher → Kafka → Consumer → Metrics" is missing a
language specifier; update the opening fence from ``` to ```text so it becomes a
text code block and preserves formatting/escaping in Markdown and matches the
CDC diagram fix pattern.

In `@packages/kafka/load-tests/src/batch-load-generator.ts`:
- Around line 7-13: The BatchLoadTestOptions interface is duplicated; extract it
into a shared types module and import it from both places to avoid drift: create
a new exported interface named BatchLoadTestOptions in a common types file,
remove the duplicate declarations in batch-load-generator.ts and
direct-batch-load-generator.ts, and update both files to import {
BatchLoadTestOptions } from the new module so the same definition is referenced
everywhere.
- Around line 49-58: totalInserted is incremented before the async inserts
complete, so failed inserts are counted as successes; move the mutation of
totalInserted into the success path and/or maintain a separate failure counter:
remove or stop updating totalInserted before Promise.all, then inside the
Promise.all .then() callback (after crdb.insertEvents/crdb.insertOrders succeed
and metrics.recordProduced is called) increment totalInserted by currentBatch;
additionally, in the .catch() increment a new failedInserted (or failedBatches)
metric so final reports reflect actual successes vs failures; keep inflight-- in
.finally() as-is.

In `@packages/kafka/load-tests/src/cdc-batch-consumer.ts`:
- Line 73: The load-test batch consumer currently has logMessages: true which
will produce excessive output under high throughput; update the consumer config
used by the Cdc batch consumer (the object containing logMessages) to set
logMessages: false to match the non-batch CdcConsumer and reduce noise during
load tests, ensuring any places that instantiate or export that config (e.g.,
the CdcBatchConsumer configuration object) use the new value.

In `@packages/kafka/load-tests/src/direct-batch-load-generator.ts`:
- Around line 17-34: The helpers generateEvent and generateOrder are duplicated
here and in direct-load-generator.ts; extract both functions into a shared
module (e.g., test-data-generators.ts or direct-schemas.ts), export them
(generateEvent, generateOrder), then import and use those exports in
direct-batch-load-generator.ts (and direct-load-generator.ts) to remove
duplication; ensure the exported functions keep the same signatures and return
types (DirectEvent, DirectOrder) so callers need no changes.
- Around line 1-103: The file duplicates orchestration logic already in
direct-load-generator.ts; refactor by extracting the shared load-test
orchestration into a single function (e.g., runLoadTestCore) that accepts
injected components/factories and options, then modify runDirectBatchLoadTest to
call that core with DirectBatchConsumer, DirectPublisher, MetricsCollector and
the BatchLoadTestOptions; specifically, move common setup (MetricsCollector
creation, publisher/consumer init, publish loop, rate throttling, drain/wait
logic, reporting and shutdown) into the core, keep only the variant-specific
consumer/publisher construction and payload generators (generateEvent,
generateOrder) in this file, and replace direct calls to
consumer.init()/publisher.init(), consumer.close()/publisher.close(), and the
publish loop in runDirectBatchLoadTest with a call to the new core function so
both runDirectBatchLoadTest and the direct-load-generator counterpart can share
it.

In `@packages/kafka/load-tests/src/load-generator.ts`:
- Around line 32-37: The variables batchesPerSecond and delayMs in
load-generator.ts are calculated only for logging and not used for throttling
(throttling is handled by the cumulative elapsed-time logic later); update the
code to avoid confusion by either removing the unused delayMs calculation or
adding a clarifying comment near the log (around the console.log that prints
batchesPerSecond and delayMs) that states delayMs is informational only and the
actual throttling is implemented by the elapsed-time based loop (referencing
batchesPerSecond and delayMs so reviewers can find the lines).

In `@packages/kafka/load-tests/src/metrics-collector.ts`:
- Line 7: The latencies array (latencies: number[]) is unbounded and can grow to
O(messages), so replace it with a bounded/summarizing structure: either
implement sampling (add a sampleRate config and only push to latencies when
counter % sampleRate === 0), or swap the array for a streaming percentile
structure (e.g., a TDigest or reservoir sampler instance) and call its add/merge
methods where the code currently pushes into latencies; also add an optional
maxSamples cap and eviction/rotation logic if you prefer trimming. Locate all
places referencing latencies (the latencies field and any methods that push to
it) and change them to use the new sampler/tDigest API or the sampleRate check;
ensure reporting functions that read latencies are updated to compute
percentiles from the new structure or from the sampled array.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 87b1614 and c05da5e.

⛔ Files ignored due to path filters (1)
  • packages/kafka/load-tests/package-lock.json is excluded by !**/package-lock.json
📒 Files selected for processing (24)
  • packages/kafka/load-tests/.gitattributes
  • packages/kafka/load-tests/README.md
  • packages/kafka/load-tests/docker-compose.yml
  • packages/kafka/load-tests/package.json
  • packages/kafka/load-tests/scripts/init-crdb.sh
  • packages/kafka/load-tests/src/batch-load-generator.ts
  • packages/kafka/load-tests/src/cdc-batch-consumer.ts
  • packages/kafka/load-tests/src/cdc-consumer.ts
  • packages/kafka/load-tests/src/cdc-schemas.ts
  • packages/kafka/load-tests/src/config.ts
  • packages/kafka/load-tests/src/crdb-client.ts
  • packages/kafka/load-tests/src/direct-batch-consumer.ts
  • packages/kafka/load-tests/src/direct-batch-load-generator.ts
  • packages/kafka/load-tests/src/direct-consumer.ts
  • packages/kafka/load-tests/src/direct-load-generator.ts
  • packages/kafka/load-tests/src/direct-publisher.ts
  • packages/kafka/load-tests/src/direct-schemas.ts
  • packages/kafka/load-tests/src/load-generator.ts
  • packages/kafka/load-tests/src/metrics-collector.ts
  • packages/kafka/load-tests/src/run-batch.ts
  • packages/kafka/load-tests/src/run-direct-batch.ts
  • packages/kafka/load-tests/src/run-direct.ts
  • packages/kafka/load-tests/src/run.ts
  • packages/kafka/load-tests/tsconfig.json

@@ -0,0 +1,24 @@
import z from 'zod/v4'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat packages/kafka/package.json | grep -A 5 -B 5 '"zod"'

Repository: kibertoad/message-queue-toolkit

Length of output: 628


🌐 Web query:

Zod 4 correct import syntax zod/v4 documentation

💡 Result:

For Zod v4, you can import either way:

// Recommended (since July 8, 2025: package root "zod" exports v4)
import * as z from "zod";
// Still supported forever (explicit v4 subpath)
import { z } from "zod/v4";

Zod’s versioning docs also note: Zod 3 is now imported via "zod/v3". [1] [2]

Sources: [1] [2]


Fix incorrect Zod import syntax.

The import uses import z from 'zod/v4' (default import), but Zod v4 does not export a default. Change to either import { z } from 'zod/v4' (named import) or import * as z from 'zod' (recommended modern syntax).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/kafka/load-tests/src/direct-schemas.ts` at line 1, The file
incorrectly uses a default import from Zod ("import z from 'zod/v4'") which
doesn't exist; replace it with a valid import such as a named import "import { z
} from 'zod/v4'" or a namespace import "import * as z from 'zod'" so that
subsequent uses of the z symbol (e.g., any calls to z.object, z.string, etc.)
resolve correctly; update the import line only and ensure no other code changes
are necessary.

@kibertoad kibertoad merged commit cb25b9f into main Feb 27, 2026
5 of 6 checks passed
@kibertoad kibertoad deleted the feat/kafka-load-tests branch February 27, 2026 09:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant